深入理解 kafka(二)

第二章 生产者

客户端开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package Kafka.Demo1;

import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class ProducerFastStart {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";

public static void main(String[] args) {
Properties properties = new Properties();
// properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
properties.put("bootstrap.servers",brokerList);
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
// 构造要发送的消息
ProducerRecord<String, String> record = new ProducerRecord<>(topic,"hello,baby");

// 发送消息
try{
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + "-" + recordMetadata.offset());

} catch (Exception e){
e.printStackTrace();
}
producer.close();

}
}

由上述代码可知,在生产者客户端,一个正常的生产逻辑需要具备以下几个步骤:

  1. 配置生产者客户端参数及创建相应的生产者实例。
  2. 构建待发送的消息。
  3. 发送消息。
  4. 关闭生产者实例。

配置生产者客户端参数及创建生产者实例

首先是有三个必须配置的参数,分别是 bootstrap.serverskey.serializervalue.serializer

  • Bootstrap.servers

    该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单。注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上。

  • key.serializer & value.serializer

    broker 端接收的消息必须以字节数组(byte[])的形式存在。在发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节数组。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名

配置完参数之后,直接创建生产者实例就可以了,注意这里的 KafkaProducer 是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

消息的发送

消息的发送有三种方式:

  1. 发后即忘。也就是直接显式调用 producer.send(record),不关心消息是否正确到达服务器端;
  2. 同步。producer.send(record) 的返回类型为 Future<RecordMetadata>,所以可以使用 future.get() 进行同步,RecordMetadata 对象包含了消息的一些元数据信息,比如当前消息的主题、分区号、分区中的偏移量、时间戳等等。
  3. 异步。如下图所示,直接在 send() 中使用回调函数。

image-20200427211014168

消息在通过 send() 方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。生产者拦截器可以用来在消费消息钱做一些准备工作,然后消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。

拦截器,一般不是必需的。生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka,这是必须的。

分区器的作用就是为消息分配分区。如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

原理分析

在前面的章节中,我们已经了解了KafkaProducer的具体使用方法,而本节的内容主要是对Kafka 生产者客户端的内部原理进行分析,通过了解生产者客户端的整体脉络可以让我们更好地使用它,避免因为一些理解上的偏差而造成使用上的错误。

整体架构

生产者客户端的整体架构

由前一小结可知,生产者客户端的send()是异步的,说明有两个线程在运行,也就是主线程和send线程。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,ProducerBatch中可以包含一至多个 ProducerRecord。所以我们其实最终发送的消息是 ProducerBatch。

元数据的更新

元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

在上面的第 8 步之前,我们要挑出负载最小的 broker「每个Node在InFlightRequests中还未确认的请求数决定的,未确认的请求越多则认为负载越大」,这样就能够尽快的发出,因为在发出的时候,我们要先获得对应的 partition 在哪个 broker、broker 是哪个ip,所以在发出之前首先要进行元数据的更新,这里我们肯定是先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。

重要的生产者参数

  • acks。这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。

    • acks = 1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应,是消息可靠性和吞吐量的折中方案。
    • acks=0。生产者发送消息之后不需要等待任何服务端的响应,可以获取最大的吞吐量。
    • acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况。

    注意哦,这里的 acks 参数配置的值是一个字符串,而不是整数型。

  • Max.request.size。这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 1MB。

  • retries和retry.backoff.ms。retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。retry.backoff.ms参数用来设定两次重试之间的时间间隔。

  • compression.type。默认为 “none”,压缩可以减少网络传输量,降低网络I/O。

  • request.timeout.ms。这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。

总结

本章主要讲述了生产者客户端的具体用法及其整体架构,主要内容包括配置参数的详解、消息的发送方式、序列化器、分区器、拦截器等。

对于KafkaProducer而言,它是线程安全的,我们可以在多线程的环境中复用它,而对于下一章的消费者客户端KafkaConsumer而言,它是非线程安全的,因为它具备了状态。

第三章 消费者

消费者和消费者组的概念

消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。

  • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

客户端开发

一个正常的消费逻辑需要具备以下几个步骤:

  1. 配置消费者客户端参数及创建相应的消费者实例。
  2. 订阅主题。
  3. 拉取消息并消费。
  4. 提交消费位移。
  5. 关闭消费者实例。

Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package Kafka.Chapter1;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastStart {
public static final String brokerList = "localhost:9092";
public static final String topic = "topic-demo";
public static final String groupId = "group.demo";

public static void main(String[] args) {
Properties properties = new Properties();
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("bootstrap.servers",brokerList);
properties.put("group.id",groupId);
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

// 订阅主题
consumer.subscribe(Collections.singletonList(topic));

// 循环读取消息
while (true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ZERO);
for(ConsumerRecord<String,String> records1 : records){
System.out.println(records1.value());
}
}
}
}

配置消费者客户端参数及创建相应的消费者实例

跟生产者客户端参数差不多,这里多了一个 group.id

  • bootstrap.servers。
  • group.id:消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exception in thread “main” org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。
  • key.deserializer 和 value.deserializer。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如示例中的org.apache.kafka.common.serialization.StringDeserializer,单单指定StringDeserializer是错误的。

消息的消费

订阅主题

配置完参数、创建完消费者实例之后,就是选择订阅主题了。集合订阅的方式subscribe(Collection)、正则表达式订阅的方式subscribe(Pattern)和指定分区的订阅方式 assign(Collection)分表代表了三种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN和USER_ASSIGNED(如果没有订阅,那么订阅状态为NONE)。

  1. 集合订阅的方式subscribe(Collection)、正则表达式订阅的方式subscribe(Pattern)和指定分区的订阅方式 assign(Collection)分表代表了三种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN和USER_ASSIGNED(如果没有订阅,那么订阅状态为NONE)。
  2. 通过 assign() 方法订阅分区时,是不具备消费者自动均衡的功能的,因为其指定了 TopicPartition。

拉取消息

订阅完主题之后,就要开始拉取消息了。Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。一般就是采用 poll() 进行消息的消费,该方法涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容,后续章节会详细介绍。

修改偏移量

消费完消息之后,就需要修改偏移量了。在每次调用poll()方法时,它返回的是还没有被消费过的消息集,要做到这一点,就需要记录上一次消费时的消费位移。消费位移存储在Kafka内部的主题__consumer_offsets中。

在 Kafka 中,有 自动提交偏移量手动提交偏移量,默认的是自动提交偏移量,,好处就是免去了复杂的位移提交逻辑,坏处就是偏移量的修改交由服务器自身去处理的话会导致重复消费和消息丢失的情况,自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

所以,有的时候,我们需要手动控制位移提交,开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false。手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。同步提交会阻塞消费者线程直到位移提交完成,异步提交不会阻塞,但是很可能导致重复消费的问题,所以一般采用以下的方式:

image-20200428150922171

指定位移消费

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。还有一个对应的值是”earliest“。

auto.offset.reset参数还有一个可配置的值—“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常。

到目前为止,我们一直都是使用 poll() 中的逻辑来进行数据的拉取,比如自动修改偏移量、使用 auto.offset.reset 来决定从末尾开始消费,但是我们只能粗粒度的从开头或者末尾进行消费,那有没有一个更细粒度的方法呢?当然有,那就是KafkaConsumer 中的 seek()方法。

1
public void seek(TopicPartition partition,long offset)

seek()方法中的参数partition表示分区,而offset参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。

seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek()方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。

多线程实现消费者客户端

KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。

KafkaConsumer 非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。

消息堆积问题的解决措施,简而言之就是使用多线程,具体来说,可以采用三种方式:

  1. 每一个线程对应一个 KafkaConsumer,每一个 KafkaConsumer 对应 topic 中的一个 partition,这些 KafkaConsumer 隶属于同一个消费组,这样能保证同一条消息不被多个 Consumer 消费。好处就是可以不用处理偏移量和顺序控制,因为 partition 内部使按序的,缺点也很明显,就是该多线程的能力受限于分区数,如果需求是所有消息都必须按序,也就是说所有消息都在一个 partition 内,此时,该方法失效;
  2. 为了解决上述问题,也就是在只有一个分区的情况下,如何解决消息堆积。这个就必须通过手动控制offset了,通过 assign()、seek() 等方法解决。好处就是打破了原有的消费线程的个数受分区数的限制,坏处就是这种实现方式对于位移提交和顺序控制的处理会变得十分复杂,实际应用基本不会涉及。
  3. 上述两种方法都是从提高拉取消息的速率考虑的,但是在实际情况中,poll() 拉取消息的速度是相当快的,整体的消费性能瓶颈并不在于此,而是在于处理消息这一块。所以,我们可以在处理消息模块改成多线程的实现方式,但是这样同样会遇到一个问题,就是消息的顺序问题,因为多线程处理消息,原本 partition 的消息是按序拉取的,但是多线程处理完之后可能就不是按顺序的了,所以我们需要进一步进行处理,作者在该书中提出了滑动窗口的解决思路,即如果按序拉取后的正在处理的消息处理完成,将其放到窗口缓存,当全部完成之后窗口才向后滑动,类似于 TCP 中的 GBN 协议和 SR 协议。

## 重要的消费者参数

  • fetch.min.bytes

    该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)。

  • fetch.max.bytes

    与上述对应,默认值为 50 MB。

  • Max.poll.records

    这个参数用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条)。

  • connections.max.idle.ms

    这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

  • metadata.max.age.ms
    这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。

Thank you for your accept. mua!
-------------本文结束感谢您的阅读-------------